package com.study.netty.handler;

import com.study.utils.MsgUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Slf4j
@ChannelHandler.Sharable
@Component
public class TcpServerHandler extends SimpleChannelInboundHandler<ByteBuf> {

    /*@Autowired
    private KafkaSender kafkaSender;*/

    @Autowired
    @Qualifier("businessGroup")
    private EventExecutorGroup businessGroup;

    /**
     * 使用
     * @param ctx
     * @param byteBuf
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
        String content = byteBuf.toString(StandardCharsets.UTF_8);

        log.info("TCP服务端接收到消息：{}",  content);


        ByteBuf buf = Unpooled.copiedBuffer("TCP已经接收到消息：".getBytes(StandardCharsets.UTF_8));

        businessGroup.execute(()->{
            try {
                /*kafkaSender.sendMessage("hello", content);*/
                send2client(ctx,buf.array());
            }catch(Throwable e) {
                log.error("TCP数据接收处理出错",e);
                ByteBuf err = Unpooled.copiedBuffer("系统错误：".getBytes(StandardCharsets.UTF_8));
                send2client(ctx,err.array());
            }
        });

    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("TCP数据接收处理出错：",cause);
    }

    /**
     * 返回消息给客户端
     * @param ctx
     * @param msg
     */
    void send2client(ChannelHandlerContext ctx, byte[] msg) {
        ByteBuf buf= Unpooled.buffer(msg.length+1);
        buf.writeBytes(msg);
        buf.writeByte(MsgUtil.DELIMITER);
        ctx.writeAndFlush(buf).addListener(future->{
            if(!future.isSuccess()) {
                log.error("TCP发送给客户端消息失败");
            }
        });
    }
}
